[新機能]クエリサイズを日時で分割しbackfill処理も行いやすいdbt incremental modelの「microbatch」strategyを試してみた

[新機能]クエリサイズを日時で分割しbackfill処理も行いやすいdbt incremental modelの「microbatch」strategyを試してみた

Clock Icon2024.10.11

さがらです。

dbt ver1.9の新機能であるmicrobatch incremental modelsを試してみたので、本記事でその内容をまとめてみます。

microbatch incremental modelsとは

まず、incremental modelsについてですが、dbtで前回更新時からの差分更新を行うことができるmodelとなっています。

https://docs.getdbt.com/docs/build/incremental-models-overview

その上で、このincremental modelsではappendmergedelete+insertinsert_overwriteという4つの増分更新方法がこれまで提供されていました。

https://docs.getdbt.com/docs/build/incremental-strategy

この増分更新方法の新しいオプションとして追加されたのが、microbatchとなります。

https://docs.getdbt.com/docs/build/incremental-microbatch

microbatchでの更新時の挙動ですが、指定したカラムの値を元に日別に分けて、複数の更新処理が走るイメージとなります。(下図は上記のリンク先ドキュメントからの引用です。)

microbatch_filters

また、日別に処理を管理できる仕様のためdbt run・buildのオプションとして下記のように--event-time-start--event-time-endのオプションを設定すると、過去の一区間のデータだけ更新されたときに、その区間だけ差分更新させるということが容易に実現できます。「backfill」と呼ばれる類の処理となります。(下図は上記のリンク先ドキュメントからの引用です。)

これは今までのincremental modelではできなかったことで、ソースデータの誤りを修正した場合やビジネスロジックが変更したときに過去データにも適用させたい場合など、活用場面は多いと思います。

dbt run --event-time-start "2024-09-01" --event-time-end "2024-09-04"

microbatch_backfill

事前準備

データの準備

事前に、以下のクエリを実行し、検証用データを準備しておきます。(DWHはSnowflakeです。クエリはClaude 3.5 Sonnetに作成してもらいました。)

create or replace table ec_sales (
    product_id integer,
    product_name varchar(100),
    quantity integer,
    sale_amount decimal(10,2),
    sold_at timestamp,
    updated_at timestamp default current_timestamp()
);

insert into ec_sales (product_id, product_name, quantity, sale_amount, sold_at, updated_at)
values
    (1001, 'スマートフォン', 2, 159980.00, '2024-10-01 10:30:00'::timestamp, '2024-10-01 11:00:00'::timestamp),
    (1002, 'ワイヤレスイヤホン', 5, 49950.00, '2024-10-01 14:45:00'::timestamp, '2024-10-01 15:00:00'::timestamp),
    (1003, 'ノートパソコン', 1, 129800.00, '2024-10-02 09:15:00'::timestamp, '2024-10-02 09:30:00'::timestamp),
    (1004, 'タブレット', 3, 149970.00, '2024-10-02 16:20:00'::timestamp, '2024-10-02 16:45:00'::timestamp),
    (1005, 'スマートウォッチ', 4, 79960.00, '2024-10-03 11:00:00'::timestamp, '2024-10-03 11:15:00'::timestamp),
    (1006, 'ポータブル充電器', 10, 29900.00, '2024-10-03 15:30:00'::timestamp, '2024-10-03 15:45:00'::timestamp),
    (1007, 'ゲーミングマウス', 2, 15980.00, '2024-10-04 13:45:00'::timestamp, '2024-10-04 14:00:00'::timestamp),
    (1008, 'メカニカルキーボード', 1, 12980.00, '2024-10-04 17:10:00'::timestamp, '2024-10-04 17:30:00'::timestamp),
    (1009, 'ワイヤレススピーカー', 3, 35970.00, '2024-10-05 10:00:00'::timestamp, '2024-10-05 10:15:00'::timestamp),
    (1010, 'アクションカメラ', 2, 79980.00, '2024-10-05 14:30:00'::timestamp, '2024-10-05 14:45:00'::timestamp);

select * from ec_sales order by sold_at, product_id;

2024-10-11_17h31_38

dbt上の定義

dbt上で「source」と「ステージングレイヤーのModelとyaml」も下記のように定義しておきます。

特に、この後作成するIncremental Modelから参照されるModelに対するyamlでconfig: event_time:を設定しておくことが非常に重要です。これがないと、microbatchのmodelを実行した際に、適切にWHERE句が入りません。

  • sources.yml
version: 2

sources:
    - name: microbatch_test
      database: SAGARA_RAWDATA_DB
      tables:
        - name: ec_sales
  • stg_ec_sales.sql
select
    product_id,
    product_name,
    quantity,
    sale_amount,
    sold_at,
    updated_at
from
    {{ source('microbatch_test','ec_sales') }}
  • schema.yml
version: 2

models:
  - name: stg_ec_sales
    config:
      event_time: sold_at

dbt Cloud上でEnviroment Variableを定義 ※Beta期間中のみ

私が試した2024年10月11日時点ではまだBeta版のため、Enviroment Variableを定義してこの機能を有効化しておく必要があります。

具体的には、下図のようにDBT_EXPERIMENTAL_MICROBATCHTrueに設定してください。

2024-10-11_21h35_12

microbatch incremental modelsを定義してbuild

dbt上で下記の内容の新しいModelを作成します。

  • ec_sales_incremental.sql
{{ config(
    materialized='incremental',
    incremental_strategy='microbatch',
    event_time='sold_at',
    begin='2024-10-01',
    lookback=4,
    batch_size='day'
) }}

select
    product_id,
    product_name,
    quantity,
    sale_amount,
    sold_at,
    updated_at
from
    {{ ref('stg_ec_sales') }}

config内の各パラメータはこのような内容となっています。

  • incremental_strategy:'microbatch'と記入。
  • event_time:このconfigの対象のModelの中で、バッチの粒度を分けたい、DATE型やTIMESTAMP型のカラム名を記入。
  • begin:このincremental modelを初めてビルドする際、どの日付からのレコードをビルドの対象するか記入。
  • lookbackdbt run/buildの実行日から、何日分遡ってビルドを行うか記入。(この記事の後半でもこのlookbackの挙動について実例交えて説明しています。)
    • デフォルトは0で、0にするとdbt run/buildの実行日しかbuild対象とならない。
    • 実行日の起算はUTCベースのため、日本ユーザーの方は日本時間午前9時より前に実行する際はご注意ください。
  • batch_size:microbatchのビルド処理の実行粒度を記入。
    • 2024年10月11日時点ではdayしか選択できない仕様となっているためご注意ください。

この後、dbt run -s +ec_sales_incrementalを実行して一度テーブルを作成してみます。

すると、下図のように実行ログが残ります。これがmicrobatch特有の仕様で、beginパラメータで定義した日付からdbt runを実行した日まで、日別にWHERE句が入ったクエリが実行されます。

2024-10-11_22h02_16

2024-10-11_22h02_45

新しいデータを6日分入れてbuild

この後、下記のSQLをSnowflake上で実行し、6日分の新しいレコードを追加します。

insert into ec_sales (product_id, product_name, quantity, sale_amount, sold_at, updated_at)
values
    (1011, '4Kモニター', 2, 89980.00, '2024-10-06 09:15:00'::timestamp, '2024-10-06 09:30:00'::timestamp),
    (1012, 'ゲーミングヘッドセット', 3, 44970.00, '2024-10-06 14:20:00'::timestamp, '2024-10-06 14:35:00'::timestamp),
    (1013, 'ワイヤレスマウス', 5, 24950.00, '2024-10-07 10:45:00'::timestamp, '2024-10-07 11:00:00'::timestamp),
    (1014, '外付けSSD', 2, 39980.00, '2024-10-07 16:30:00'::timestamp, '2024-10-07 16:45:00'::timestamp),
    (1015, 'ウェブカメラ', 4, 31960.00, '2024-10-08 11:20:00'::timestamp, '2024-10-08 11:35:00'::timestamp),
    (1016, 'ブルートゥーススピーカー', 3, 26970.00, '2024-10-08 15:50:00'::timestamp, '2024-10-08 16:05:00'::timestamp),
    (1017, 'ゲーミングチェア', 1, 49990.00, '2024-10-09 10:30:00'::timestamp, '2024-10-09 10:45:00'::timestamp),
    (1018, 'メカニカルキーボード', 2, 29980.00, '2024-10-09 15:15:00'::timestamp, '2024-10-09 15:30:00'::timestamp),
    (1019, 'ノイズキャンセリングヘッドホン', 3, 89970.00, '2024-10-10 09:45:00'::timestamp, '2024-10-10 10:00:00'::timestamp),
    (1020, 'ポータブルSSD', 4, 59960.00, '2024-10-10 14:20:00'::timestamp, '2024-10-10 14:35:00'::timestamp),
    (1021, 'スマートホームハブ', 2, 39980.00, '2024-10-11 11:10:00'::timestamp, '2024-10-11 11:25:00'::timestamp),
    (1022, 'ドローン', 1, 79990.00, '2024-10-11 16:40:00'::timestamp, '2024-10-11 16:55:00'::timestamp);

この上で、再度dbt run -s +ec_sales_incrementalを実行してみます。

すると、今度は10月7日~10月11日の5日分のデータに対して日別にクエリが実行されました。

これは、sqlファイル上のconfiglookback=4としていたためであり、dbt runの実行日である10月11日から4日分遡って、10月10日、10月9日、10月8日、10月7日、が処理対象となっていたからです。

2024-10-11_22h15_40

このため、先ほど6日分INSERTしたうちの1日目である10月6日分のデータは漏れた形でデータが入っています。(下図参照)

この次の章で、10月6日分が漏れたリカバリを行っていきます。

2024-10-11_22h20_15

10月6日分のデータをリカバリ(backfill)

10月6日分のデータが漏れた分をリカバリするためには、dbt run -s +ec_sales_incremental --event-time-start "2024-10-06" --event-time-end "2024-10-07"という形でリカバリしたい期間を指定してコマンドを実行します。

2024-10-11_22h22_40

このコマンドの実行により、下図のようにクエリが実行されます。

2024-10-11_22h29_00

2024-10-11_22h29_31

2024-10-11_22h31_15

これで、10月6日分のデータがリカバリができました!

2024-10-11_22h32_31

注意点としては、2024年10月11日時点ではTIMESTAMP型のカラムをevent_timeに指定していると、リカバリしたい期間の1日後まで指定しないと正しくWHERE句が指定されません

参考までに、dbt run -s +ec_sales_incremental --event-time-start "2024-10-06" --event-time-end "2024-10-06"を実行すると下図のようなWHERE句でクエリが実行されてしまい、全く意味のないクエリとなってしまいます。

2024-10-11_22h25_24

最後に

dbt ver1.9の新機能であるmicrobatch incremental modelsを試してみました。

個人的に、backfillが簡単にできることがとても気に入りました!これまでのincremental modelだと過去のある1日だけでもデータの変更があると、full-refleshで再度作り直しを行うか、レコードを直接修正、一時的にクエリの内容を修正、などする必要があり大変だったので…

また、日別に細かいクエリを実行する形となるため、例えばSnowflakeでいうとより小さいウェアハウスサイズにできる可能性もあります。マルチクラスターウェアハウスの恩恵も受けやすくなりそうですね。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.